/*
 * Wazuh Vulnerability scanner
 * Copyright (C) 2015, Wazuh Inc.
 * March 25, 2023.
 *
 * This program is free software; you can redistribute it
 * and/or modify it under the terms of the GNU General Public
 * License (version 2) as published by the FSF - Free Software
 * Foundation.
 */

#include "vulnerabilityScannerFacade.hpp"
#include "agentReScanListException.hpp"
#include "archiveHelper.hpp"
#include "defs.h"
#include "loggerHelper.h"
#include "messageBuffer_generated.h"
#include "scanOrchestrator.hpp"
#include "wazuh_modules/vulnerability_scanner/src/policyManager/policyManager.hpp"
#include "wdbDataException.hpp"
#include "xzHelper.hpp"
#include <string>

constexpr auto DEFAULT_QUEUE_PATH = "queue/sockets/queue";
constexpr auto REPORTS_QUEUE_PATH = "queue/vd/reports";
constexpr auto REPORTS_BULK_SIZE {1};
constexpr auto EVENTS_BULK_SIZE {1};
constexpr auto EVENTS_QUEUE_PATH = "queue/vd/event";
constexpr auto MICROSEC_FACTOR {1000000};

constexpr auto COMPRESSED_DB_PATH {"tmp/vd_1.0.0_vd_4.10.0.tar.xz"};
constexpr auto DECOMPRESSED_DB_PATH {"tmp/vd_1.0.0_vd_4.10.0.tar"};
constexpr auto VD_STATE_QUEUE_PATH = "queue/vd/state_track";
constexpr auto VD_KEYSTORE_PATH = "queue/keystore";
constexpr auto VD_DATABASE_PATH {"queue/vd"};
constexpr auto VD_DATABASE_VERSION_KEY {"installed_content"};
constexpr auto VD_STATE_KEY {"vulnerability_scanner_previous_state"};
constexpr auto VD_MANAGER_STATE_KEY {"manager_scan_previous_state"};
constexpr auto CLUSTER_NAME_KEY {"cluster_previous_name"};
constexpr auto CLUSTER_NODE_NAME_KEY {"cluster_node_name"};
constexpr auto CLUSTER_STATE_KEY {"cluster_previous_state"};
constexpr auto DISABLED {"disabled"};
constexpr auto ENABLED {"enabled"};

bool VulnerabilityScannerFacade::decompressDatabase(std::string_view databaseVersion) const
{
    bool ret = false;

    // Check database version. It will attempt to decompress the database
    // if the version does not match or the state_track does not have information
    if ((databaseVersion.compare(__ossec_version) != 0 || databaseVersion.empty()) && std::filesystem::exists(TMP_DIR))
    {
        // Check for XZ compressed file.
        if (!std::filesystem::exists(COMPRESSED_DB_PATH))
        {
            logDebug2(WM_VULNSCAN_LOGTAG,
                      "No database compressed file found at '%s'. Skipping decompression.",
                      COMPRESSED_DB_PATH);
            return ret;
        }

        logInfo(WM_VULNSCAN_LOGTAG, "Starting database file decompression.");
        logDebug2(WM_VULNSCAN_LOGTAG, "Starting XZ file decompression.");

        // Clean up possible trash files.
        std::filesystem::remove_all(DECOMPRESSED_DB_PATH);

        // Decompress XF file format.
        Utils::XzHelper(std::filesystem::path(COMPRESSED_DB_PATH), std::filesystem::path(DECOMPRESSED_DB_PATH))
            .decompress();

        // Clean up feed database.
        std::filesystem::remove_all(DATABASE_PATH);

        // Extract queue/vd and queue/vd_updater
        std::vector<std::string> extractOnly;
        extractOnly.emplace_back(VD_DATABASE_PATH);

        // Decompress also keystore if missing.
        if (!std::filesystem::exists(VD_KEYSTORE_PATH))
        {
            extractOnly.emplace_back(VD_KEYSTORE_PATH);
        }

        logDebug2(WM_VULNSCAN_LOGTAG, "Starting TAR file decompression.");

        // Decompress TAR file format.
        Utils::ArchiveHelper::decompress(DECOMPRESSED_DB_PATH, m_shouldStop, "", extractOnly);

        // Clean up.
        std::filesystem::remove_all(DECOMPRESSED_DB_PATH);

        if (!m_shouldStop.load())
        {
            ret = true;
            logInfo(WM_VULNSCAN_LOGTAG, "Database decompression finished.");
        }
    }

    return ret;
}

void VulnerabilityScannerFacade::initAlertReportDispatcher()
{
    const auto alertsMaxEps = PolicyManager::instance().getAlertsMaxEventsPerSecond();
    const auto reportsWait = alertsMaxEps > 0 ? MICROSEC_FACTOR / alertsMaxEps : 0;

    m_reportSocketClient =
        std::make_shared<SocketClient<Socket<OSPrimitives, NoHeaderProtocol>, EpollWrapper>>(DEFAULT_QUEUE_PATH);
    m_reportSocketClient->connect(
        [](const char*, uint32_t, const char*, uint32_t)
        {
            // Not used
        },
        []()
        {
            // Not used
        },
        SOCK_DGRAM);
    m_reportDispatcher = std::make_shared<ReportDispatcher>(
        [this, reportsWait](std::queue<std::string>& dataQueue)
        {
            while (!dataQueue.empty())
            {
                const auto& data = dataQueue.front();
                m_reportSocketClient->send(data.c_str(), data.size());
                // We wait to keep the maximum number of events per second
                if (reportsWait > 0)
                {
                    std::this_thread::sleep_for(std::chrono::microseconds(reportsWait));
                }
                logDebug2(WM_VULNSCAN_LOGTAG, "Report sent: %s", data.c_str());
                dataQueue.pop();
            }
        },
        {.dbPath = REPORTS_QUEUE_PATH, .bulkSize = REPORTS_BULK_SIZE});
}

/**
 * @brief Start the event dispatcher
 *
 */
void VulnerabilityScannerFacade::initEventDispatcher()
{
    // Init Orchestrator
    auto scanOrchestrator = std::make_shared<ScanOrchestrator>(
        m_indexerConnector, m_databaseFeedManager, m_reportDispatcher, m_internalMutex);

    m_eventDispatcher->startWorker(
        // coverity[copy_constructor_call]
        [scanOrchestrator](std::queue<rocksdb::PinnableSlice>& dataQueue)
        {
            const auto parseEventMessage = [](const rocksdb::PinnableSlice& element) -> std::string
            {
                if (const auto eventMessageBuffer = GetMessageBuffer(element.data()); eventMessageBuffer)
                {
                    return std::string(eventMessageBuffer->data()->begin(), eventMessageBuffer->data()->end());
                }

                return "unable to parse";
            };

            const auto& element = dataQueue.front();
            try
            {
                if (flatbuffers::Verifier verifier(reinterpret_cast<const uint8_t*>(element.data()), element.size());
                    VerifyMessageBufferBuffer(verifier))
                {
                    scanOrchestrator->processEvent(element);
                }
            }
            catch (const WdbDataException& e)
            {
                logDebug2(WM_VULNSCAN_LOGTAG, "WdbDataException (Agent %s). Reason: %s", e.agentId().c_str(), e.what());
                scanOrchestrator->pushEventToDelayedDispatcher(element, e.agentId());
            }
            catch (const AgentReScanListException& e)
            {
                logDebug2(WM_VULNSCAN_LOGTAG, "AgentReScanListException. Reason: %s", e.what());
                for (const auto& agentData : e.agentList())
                {
                    scanOrchestrator->pushReScanToDelayedDispatcher(agentData.id, e.noIndex());
                }
            }
            catch (const nlohmann::json::exception& e)
            {
                logError(WM_VULNSCAN_LOGTAG,
                         "VulnerabilityScannerFacade::initEventDispatcher: json exception (%d) - Event message: %s",
                         e.id,
                         parseEventMessage(element).c_str());
            }
            catch (const std::exception& e)
            {
                logError(WM_VULNSCAN_LOGTAG,
                         "VulnerabilityScannerFacade::initEventDispatcher: %s - Event message: %s",
                         e.what(),
                         parseEventMessage(element).c_str());
            }
        });
}

/**
 * @brief Start the deltas subscription
 *
 */
void VulnerabilityScannerFacade::initDeltasSubscription()
{
    // Subscription to syscollector delta events.
    m_syscollectorDeltasSubscription =
        std::make_unique<RouterSubscriber>("deltas-syscollector", "vulnerability_scanner_deltas");
    m_syscollectorDeltasSubscription->subscribe(
        // coverity[copy_constructor_call]
        [this](const std::vector<char>& message) { pushEvent(message, BufferType::BufferType_DBSync); });
}

/**
 * @brief Start the rsync events subscription.
 *
 */
void VulnerabilityScannerFacade::initRsyncSubscription()
{
    // Subscription to syscollector rsync events.
    m_syscollectorRsyncSubscription =
        std::make_unique<RouterSubscriber>("rsync-syscollector", "vulnerability_scanner_rsync");
    m_syscollectorRsyncSubscription->subscribe(
        // coverity[copy_constructor_call]
        [this](const std::vector<char>& message) { pushEvent(message, BufferType::BufferType_RSync); });
}

void VulnerabilityScannerFacade::initWazuhDBEventSubscription()
{
    m_wdbAgentEventsSubscription =
        std::make_unique<RouterSubscriber>("wdb-agent-events", "vulnerability_scanner_database");
    m_wdbAgentEventsSubscription->subscribe([this](const std::vector<char>& message)
                                            { pushEvent(message, BufferType::BufferType_JSON); });
}

void VulnerabilityScannerFacade::vulnerabilityScanPolicyChange(Utils::RocksDBWrapper& stateDB) const
{
    // Check if the vulnerability scanner was enabled/disabled
    const std::string moduleState = PolicyManager::instance().isVulnerabilityDetectionEnabled() ? ENABLED : DISABLED;
    if (std::string moduleLastState; stateDB.get(VD_STATE_KEY, moduleLastState))
    {
        // If the module was disabled and now is enabled, set the flag to re-scan all agents.
        if (moduleLastState.compare(DISABLED) == 0 && moduleState.compare(ENABLED) == 0)
        {
            logDebug1(WM_VULNSCAN_LOGTAG, "Vulnerability scanner module was re-enabled (agents re-scan needed)");
            m_agentsAction = ActionWrapper::Action::SCAN;
        }
    }
    stateDB.put(VD_STATE_KEY, moduleState);

    // If the module is disabled, don't check the manager state
    if (moduleState.compare(DISABLED) == 0)
    {
        return;
    }

    // Check if the vulnerability scanner on the manager was enabled/disabled
    const std::string managerState = PolicyManager::instance().getManagerDisabledScan() ? DISABLED : ENABLED;
    if (std::string managerLastState; stateDB.get(VD_MANAGER_STATE_KEY, managerLastState))
    {
        // Scanner Disabled -> Perform a clean-up
        if (managerLastState.compare(ENABLED) == 0 && managerState.compare(DISABLED) == 0)
        {
            logDebug1(WM_VULNSCAN_LOGTAG, "Vulnerability scanner in manager disabled (manager clean-up needed)");
            m_managerAction = ActionWrapper::Action::CLEANUP;
        }
        // Scanner Enabled -> Perform a re-scan
        else if (managerLastState.compare(DISABLED) == 0 && managerState.compare(ENABLED) == 0)
        {
            logDebug1(WM_VULNSCAN_LOGTAG, "Vulnerability scanner in manager enabled (manager re-scan needed)");
            m_managerAction = ActionWrapper::Action::SCAN;
        }
        else if (managerLastState.compare(DISABLED) == 0 && managerState.compare(DISABLED) == 0)
        {
            logDebug1(WM_VULNSCAN_LOGTAG, "Vulnerability scanner in manager still disabled");
            m_managerAction = ActionWrapper::Action::HARD_NONE; // Hard none action, can't be changed.
        }
    }
    stateDB.put(VD_MANAGER_STATE_KEY, managerState);
}

void VulnerabilityScannerFacade::clusterConfigurationChange(Utils::RocksDBWrapper& stateDB) const
{
    // Check cluster name changes
    const auto clusterName = PolicyManager::instance().getClusterName();
    if (std::string clusterLastName; stateDB.get(CLUSTER_NAME_KEY, clusterLastName))
    {
        // If the cluster name changed, perform a scan
        if (clusterLastName.compare(clusterName) != 0)
        {
            logDebug1(WM_VULNSCAN_LOGTAG, "Cluster name changed (agents re-scan needed)");
            m_agentsAction = ActionWrapper::Action::SCAN;
        }
    }
    stateDB.put(CLUSTER_NAME_KEY, clusterName);

    // Check cluster state changes
    const auto clusterState = PolicyManager::instance().getClusterStatus() ? ENABLED : DISABLED;
    if (std::string clusterLastState; stateDB.get(CLUSTER_STATE_KEY, clusterLastState))
    {
        // If the cluster status changed, perform a scan
        if (clusterLastState.compare(clusterState) != 0)
        {
            logDebug1(WM_VULNSCAN_LOGTAG, "Cluster was enabled/disabled (agents re-scan needed)");
            m_agentsAction = ActionWrapper::Action::SCAN;
        }
    }
    stateDB.put(CLUSTER_STATE_KEY, clusterState);

    // Check cluster node name changes
    const auto& clusterNodeName = PolicyManager::instance().getClusterNodeName();
    if (std::string clusterNodeLastName; stateDB.get(CLUSTER_NODE_NAME_KEY, clusterNodeLastName))
    {
        // If the cluster node name changed, perform a scan
        // Note: If the manager is disabled (CLEANUP or HARD_NONE), we don't need to scan
        if (clusterNodeLastName.compare(clusterNodeName) != 0 && m_managerAction != ActionWrapper::Action::CLEANUP &&
            m_managerAction != ActionWrapper::Action::HARD_NONE)
        {
            logDebug1(WM_VULNSCAN_LOGTAG, "Cluster node name changed (manager re-scan needed)");
            m_managerAction = ActionWrapper::Action::SCAN;
        }
    }
    stateDB.put(CLUSTER_NODE_NAME_KEY, clusterNodeName);
}

void VulnerabilityScannerFacade::handlePolicyChanges() const
{
    const auto pushActionData = [this](const nlohmann::json& actionData)
    {
        logDebug2(WM_VULNSCAN_LOGTAG, "actionData: %s", actionData.dump().c_str());

        const auto& actionDataString = actionData.dump();
        const std::vector<char> message(actionDataString.begin(), actionDataString.end());

        pushEvent(message, BufferType::BufferType_JSON);
    };

    if (m_agentsAction == ActionWrapper::Action::SCAN)
    {
        logInfo(WM_VULNSCAN_LOGTAG, "Policy changed. Performing re-scan over all agents.");

        nlohmann::json actionData;
        actionData["action"] = "reboot";
        // We shouldn't index if we are in a cluster environment
        actionData["no-index"] = PolicyManager::instance().getClusterStatus();

        pushActionData(actionData);
    }
    else
    {
        logDebug2(WM_VULNSCAN_LOGTAG, "No policy has changed or no action is needed for the agents.");
    }

    // If we already trigger the re-scan over the agents, we don't need to do it again for the manager.
    if (m_managerAction == ActionWrapper::Action::SCAN && m_agentsAction != ActionWrapper::Action::SCAN)
    {
        logInfo(WM_VULNSCAN_LOGTAG, "Policy changed. Performing re-scan over the manager.");

        nlohmann::json actionData;
        actionData["action"] = "scanAgent";
        actionData["agent_info"]["agent_id"] = "000";

        pushActionData(actionData);
    }
    else if (m_managerAction == ActionWrapper::Action::CLEANUP)
    {
        logInfo(WM_VULNSCAN_LOGTAG, "Policy changed. Performing clean-up over the manager.");

        nlohmann::json actionData;
        actionData["action"] = "deleteAgent";
        actionData["agent_info"]["agent_id"] = "000";

        pushActionData(actionData);
    }
    else
    {
        logDebug2(WM_VULNSCAN_LOGTAG, "No policy has changed or no action is needed for the manager.");
    }
}

// LCOV_EXCL_START
void VulnerabilityScannerFacade::start(
    const std::function<void(
        const int, const std::string&, const std::string&, const int, const std::string&, const std::string&, va_list)>&
        logFunction,
    const nlohmann::json& configuration,
    const bool noWaitToStop,
    const bool reloadGlobalMapsStartup,
    const bool initContentUpdater)
{
    try
    {
        m_noWaitToStop = noWaitToStop;

        // Initialize logging
        Log::assignLogFunction(logFunction);

        // Policy manager initialization.
        auto& policyManager = PolicyManager::instance();
        policyManager.initialize(configuration);

        // Create a unique pointer to a RocksDBWrapper instance for managing state information.
        auto stateDB = std::make_unique<Utils::RocksDBWrapper>(VD_STATE_QUEUE_PATH);

        SocketDBWrapper::instance().init();

        // Check the policy for the vulnerability scanner
        vulnerabilityScanPolicyChange(*stateDB);

        // Return if the module is disabled.
        if (!policyManager.isVulnerabilityDetectionEnabled())
        {
            logInfo(WM_VULNSCAN_LOGTAG, "Vulnerability scanner module is disabled.");
            return;
        }

        // Check the cluster configuration
        clusterConfigurationChange(*stateDB);

        // Indexer connector initialization.
        if (policyManager.isIndexerEnabled())
        {
            const auto& indexerConfig = policyManager.getIndexerConfiguration();
            m_indexerConnector =
                std::make_shared<IndexerConnector>(policyManager.getIndexerConfiguration(), logFunction);
        }

        // Socket client initialization to send vulnerability reports.
        initAlertReportDispatcher();

        m_eventDispatcher = std::make_shared<EventDispatcher>(EVENTS_QUEUE_PATH, EVENTS_BULK_SIZE);

        // Checks for the actions to be performed after the policy change (vulnerability scanner).
        handlePolicyChanges();

        // Subscription to syscollector delta events.
        initDeltasSubscription();

        // Subscription to syscollector rsync events.
        initRsyncSubscription();

        // Wazuh DB event subscription.
        initWazuhDBEventSubscription();

        // Query the current database version.
        std::string databaseVersion;
        if (stateDB->get(VD_DATABASE_VERSION_KEY, databaseVersion))
        {
            logDebug1(WM_VULNSCAN_LOGTAG, "Database version: %s", databaseVersion.c_str());
        }

        // Decompress database content.
        if (decompressDatabase(databaseVersion) && !m_shouldStop.load())
        {
            stateDB->put(VD_DATABASE_VERSION_KEY, __ossec_version);

            // Cleanup
            std::filesystem::remove_all(COMPRESSED_DB_PATH);

            logDebug1(WM_VULNSCAN_LOGTAG, "Updated %s key of %s.", VD_DATABASE_VERSION_KEY, VD_STATE_QUEUE_PATH);
        }

        // Database feed manager initialization.
        m_databaseFeedManager = std::make_shared<DatabaseFeedManager>(
            m_indexerConnector,
            m_shouldStop,
            m_internalMutex,
            true,
            reloadGlobalMapsStartup,
            initContentUpdater,
            [this, reloadGlobalMapsStartup]()
            {
                // Re-scan all agent after content update, only if is an instance of vulnerability scanner.
                if (reloadGlobalMapsStartup)
                {
                    nlohmann::json actionData;
                    actionData["action"] = "reboot";
                    // We shouldn't index if we are in a cluster environment
                    actionData["no-index"] = PolicyManager::instance().getClusterStatus();

                    const std::string actionDataString = actionData.dump();
                    const std::vector<char> actionMessage(actionDataString.begin(), actionDataString.end());

                    pushEvent(actionMessage, BufferType::BufferType_JSON);
                    logInfo(WM_VULNSCAN_LOGTAG, "Triggered a re-scan after content update.");
                }
            });

        // Add subscribers for policy updates.
        policyManager.addSubscriber(m_databaseFeedManager);

        // Event dispatcher initialization.
        initEventDispatcher();

        logInfo(WM_VULNSCAN_LOGTAG, "Vulnerability scanner module started.");
    }
    catch (const std::exception& e)
    {
        logError(WM_VULNSCAN_LOGTAG, "VulnerabilityScannerFacade::start: %s.", e.what());
    }
    catch (...)
    {
        logError(WM_VULNSCAN_LOGTAG, "VulnerabilityScannerFacade::start: Unknown exception.");
    }
}
// LCOV_EXCL_STOP

void VulnerabilityScannerFacade::stop()
{
    // Atomic flag section
    if (m_noWaitToStop)
    {
        m_shouldStop.store(true);
    }

    m_retryWait.notify_all();

    // Threads join
    if (m_rebootThread.joinable())
    {
        m_rebootThread.join();
    }

    if (m_managerThread.joinable())
    {
        m_managerThread.join();
    }

    // Reset shared pointers
    m_indexerConnector.reset();
    m_databaseFeedManager.reset();
    m_syscollectorRsyncSubscription.reset();
    m_syscollectorDeltasSubscription.reset();
    m_wdbAgentEventsSubscription.reset();

    // Policy manager teardown
    PolicyManager::instance().teardown();
    m_reportDispatcher.reset();
    m_eventDispatcher.reset();

    // Destroy socketDbWrapper
    SocketDBWrapper::instance().teardown();
}
